package com.fwmagic.dynamic_rule.datagen;

import com.alibaba.fastjson.JSON;
import com.fwmagic.dynamic_rule.bean.LogBean;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Properties;

/**
 * 功能：模拟器生成用户行为日志数据写入Kafka
 * {
 * "account":"157",
 * "appId":"cn.do",
 * "appVersion":"3.0",
 * "carrier":"中国移动",
 * "deviceId":"157",
 * "deviceType":"mi10",
 * "eventId":"thumbUp",
 * "ip":"152.33.68.90",
 * "latitude":56.29385792,
 * "longitude":120.329857234,
 * "netType":"wifi",
 * "osName":"android",
 * "osVersion":"9.0",
 * "properties":{
 * "productId":"10",
 * "pageId":"10"
 * },
 * "releaseChannel":"小米应用",
 * "resolution":"1024*2048",
 * "sessionId":"sessionid157",
 * "timeStamp":1616142957968
 * }
 */
public class ActionLogGen {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hd1:9092,hd2:9092,hd3:9092");
        //acks=0 配置适用于实现非常高的吞吐量 , acks=all 这是最安全的模式
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    KafkaProducer producer = new KafkaProducer(props);
                    while (true) {
                        logGen(producer);
                    }
                }
            }).start();
        }
    }

    private static void logGen(KafkaProducer producer) {
        LogBean logBean = new LogBean();
        final String account = StringUtils.leftPad(RandomUtils.nextInt(1, 50) + "", 6, "0");
        logBean.setAccount(account);
        logBean.setAppId("cn.doitedu.yinew");
        logBean.setAppVersion("2.5");
        logBean.setCarrier("中国移动");
        logBean.setDeviceId(account);
        logBean.setIp("10.102.36.88");
        logBean.setLatitude(RandomUtils.nextDouble(10.0, 52.0));
        logBean.setLongitude(RandomUtils.nextDouble(120.0, 160.0));
        logBean.setDeviceType("mi6");
        logBean.setNetType("5G");
        logBean.setOsName("android");
        logBean.setOsVersion("7.5");
        logBean.setReleaseChannel("小米应用市场");
        logBean.setResolution("2048*1024");
        logBean.setEventId(RandomStringUtils.randomAlphabetic(1).toUpperCase());
//        logBean.setEventId("A");

        HashMap<String, String> properties = new HashMap<>();
        for (int i = 0; i < RandomUtils.nextInt(1, 5); i++) {
            properties.put("p" + RandomUtils.nextInt(1, 10), "v" + RandomUtils.nextInt(1, 3));
        }

        logBean.setProperties(properties);
        logBean.setTimeStamp(System.currentTimeMillis());
        logBean.setSessionId(RandomStringUtils.randomNumeric(10, 10));

        final String logJson = JSON.toJSONString(logBean);
        System.out.println(logJson);

        final ProducerRecord<String, String> record = new ProducerRecord<String, String>("yinew_applog", logJson);
        producer.send(record);

        try {
            Thread.sleep(300);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
